Skip to content

flo_ai aws bedrock integration#228

Open
rootflo-hardik wants to merge 1 commit intodevelopfrom
CU-86d233u6w-support-for-aws-bedrock-llm
Open

flo_ai aws bedrock integration#228
rootflo-hardik wants to merge 1 commit intodevelopfrom
CU-86d233u6w-support-for-aws-bedrock-llm

Conversation

@rootflo-hardik
Copy link
Contributor

@rootflo-hardik rootflo-hardik commented Feb 25, 2026

Summary by CodeRabbit

New Features

  • AWS Bedrock is now available as an LLM provider with full integration
  • Supports both text generation and streaming response modes
  • Enables function calling and tool integration capabilities
  • Includes image message handling support for multimodal interactions

if content:
buffer += content
continue
except json.JSONDecodeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address this, we shouldnt do pass without a log

)
if content:
buffer += content
except json.JSONDecodeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

@coderabbitai
Copy link

coderabbitai bot commented Feb 25, 2026

📝 Walkthrough

Walkthrough

This pull request introduces AWS Bedrock LLM integration to the flo_ai library. A new AWSBedrock class is created extending BaseLLM, providing generation, streaming, message conversion, tool formatting, and image handling capabilities for Bedrock models. The module is then exported through the llm package's public API.

Changes

Cohort / File(s) Summary
AWS Bedrock LLM Implementation
flo_ai/flo_ai/llm/aws_bedrock_llm.py
New AWSBedrock class with async generation and streaming methods, message-to-Bedrock conversion, tool/function formatting utilities, image payload handling, reasoning block stripping, and telemetry integration.
LLM Package Exports
flo_ai/flo_ai/llm/__init__.py
Added AWSBedrock import and export to make the new integration available in the public llm namespace.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant AWSBedrock
    participant BedrockRuntime as Bedrock Runtime
    participant ToolHandler as Tool Handler

    Client->>AWSBedrock: generate(messages, functions, output_schema)
    AWSBedrock->>AWSBedrock: _convert_messages(messages, output_schema)
    AWSBedrock->>AWSBedrock: format_tools_for_llm(functions)
    AWSBedrock->>BedrockRuntime: invoke_model(request)
    BedrockRuntime-->>AWSBedrock: response (content + usage)
    AWSBedrock->>AWSBedrock: _strip_reasoning(content)
    alt Tool Call Detected
        AWSBedrock->>ToolHandler: get_assistant_message_for_tool_call(response)
        ToolHandler-->>AWSBedrock: tool_call payload
    else Text Response
        AWSBedrock->>AWSBedrock: get_message_content(response)
    end
    AWSBedrock-->>Client: response (text or tool_call)
Loading
sequenceDiagram
    participant Client
    participant AWSBedrock
    participant BedrockRuntime as Bedrock Runtime

    Client->>AWSBedrock: stream(messages, functions)
    AWSBedrock->>AWSBedrock: _convert_messages(messages)
    AWSBedrock->>AWSBedrock: format_tools_for_llm(functions)
    AWSBedrock->>BedrockRuntime: invoke_model_with_response_stream(request)
    BedrockRuntime-->>AWSBedrock: event stream
    loop Process Events
        AWSBedrock->>AWSBedrock: parse chunk/SSE data
        AWSBedrock->>AWSBedrock: accumulate content
    end
    AWSBedrock->>AWSBedrock: _strip_reasoning(accumulated_content)
    AWSBedrock-->>Client: yield final content
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 A Bedrock foundation, now laid,
With streaming and tools, a parade!
AWS channels open wide,
Messages flow, no reasoning to hide,
The warren's LLM skills have swayed! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding AWS Bedrock integration to the flo_ai LLM module, which is clearly reflected in the new AWSBedrock class and package exports.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch CU-86d233u6w-support-for-aws-bedrock-llm

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
flo_ai/flo_ai/llm/aws_bedrock_llm.py (1)

177-178: Empty except blocks silently swallow errors — add debug logging.

Both except json.JSONDecodeError: pass blocks silently discard parse failures with no trace. Add a module-level logger and log at debug level for observability. This was already flagged in a prior review.

Also applies to: 190-191

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 177 - 178, The two empty
except blocks catching json.JSONDecodeError in the aws_bedrock_llm module should
not swallow errors silently: add a module-level logger (import logging; logger =
logging.getLogger(__name__)) and in each except json.JSONDecodeError block log
the failure at debug level, e.g. logger.debug("Failed to decode JSON in
<function_or_method_name>: %s", e) while capturing the exception as a variable
(except json.JSONDecodeError as e), so the parse errors in the aws_bedrock_llm
module (e.g., in the AWSBedrockLLM class methods where the JSON is parsed) are
visible for debugging.
🧹 Nitpick comments (2)
flo_ai/flo_ai/llm/aws_bedrock_llm.py (2)

35-37: Use explicit Optional type annotation (PEP 484).

Static analysis flags the implicit Optional on output_schema: dict = None. Per PEP 484, use an explicit union type.

♻️ Proposed fix
     def _convert_messages(
-        self, messages: list[dict], output_schema: dict = None
+        self, messages: list[dict], output_schema: Optional[dict] = None
     ) -> list[dict]:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 35 - 37, The parameter
output_schema in method _convert_messages should use an explicit Optional type
per PEP 484; import Optional from typing and change the signature of
_convert_messages to use output_schema: Optional[dict] = None (keeping return
type list[dict] unchanged) so static analyzers no longer treat the None default
as an implicit Optional; update any related type hints in that scope if present.

110-135: Simplify the content-existence check (Ruff RUF019).

Line 115 performs an unnecessary key check before accessing. message.get('content') already returns None if the key is absent, so the truthiness check covers both cases.

♻️ Proposed fix
-        if 'content' in message and message['content']:
-            message['content'] = self._strip_reasoning(message['content'])
+        if message.get('content'):
+            message['content'] = self._strip_reasoning(message['content'])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 110 - 135, The check "if
'content' in message and message['content']" is redundant; replace it by reading
content = message.get('content') and only call self._strip_reasoning when
content is truthy, e.g. if content: message['content'] =
self._strip_reasoning(content). This simplifies the logic around the local
variable message in the AWS Bedrock response parsing (refer to the message
variable and the _strip_reasoning method) and preserves the later use of
message.get('content', '') to compute text_content.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py`:
- Around line 163-195: The stream() method currently accumulates all deltas into
a local buffer and yields only once; change it to yield each content delta as it
is parsed so consumers receive streaming updates: offload the blocking iteration
over response['body'] (which comes from invoke_model_with_response_stream) to a
thread or executor (e.g., asyncio.to_thread/run_in_executor) so the event loop
isn't blocked, parse each event/chunk as it arrives, apply self._strip_reasoning
to each incremental piece (or accumulate minimally and strip per-yield if
needed) and yield {'content': cleaned_chunk} immediately for every non-empty
content delta instead of returning a single final yield.
- Around line 20-29: The __init__ in AWS Bedrock LLM is missing a production
dependency and contains a suspicious default model plus redundant assignments:
add "boto3>=1.36.1,<2" to the package's main dependencies in pyproject.toml so
boto3.client('bedrock-runtime') in the __init__ won't raise ImportError; replace
the default model string 'openai.gpt-oss-20b-1:0' in the AWSBedrock __init__
signature with a valid Bedrock model ID (e.g., 'amazon.titan-text-express-v1')
or document the intentional placeholder in the constructor docstring; and remove
the redundant assignments to self.model and self.kwargs in AWSBedrock.__init__
since BaseLLM.__init__ already sets them.
- Around line 63-88: The AWSBedrock class is implemented to talk to
OpenAI-compatible Bedrock models (generate uses OpenAI-style request/response
shapes and boto_client.invoke_model), so either (A) make this explicit by
updating the class docstring of AWSBedrock (or rename the class to
AWSBedrockOpenAICompat) and keep the current OpenAI-compatible behavior, or (B)
migrate generate to use Bedrock's broader Converse API and model-specific
request/response handling (replace current request_body keys like 'model',
'messages', 'temperature', 'max_completion_tokens', 'tools' and adjust response
parsing that expects choices[].message). In either path remove the redundant
'model' key from request_body (you already pass modelId=self.model to
boto_client.invoke_model) and update generate (and any usage of
_convert_messages and response parsing) so behavior matches the chosen
compatibility scope.

---

Duplicate comments:
In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py`:
- Around line 177-178: The two empty except blocks catching json.JSONDecodeError
in the aws_bedrock_llm module should not swallow errors silently: add a
module-level logger (import logging; logger = logging.getLogger(__name__)) and
in each except json.JSONDecodeError block log the failure at debug level, e.g.
logger.debug("Failed to decode JSON in <function_or_method_name>: %s", e) while
capturing the exception as a variable (except json.JSONDecodeError as e), so the
parse errors in the aws_bedrock_llm module (e.g., in the AWSBedrockLLM class
methods where the JSON is parsed) are visible for debugging.

---

Nitpick comments:
In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py`:
- Around line 35-37: The parameter output_schema in method _convert_messages
should use an explicit Optional type per PEP 484; import Optional from typing
and change the signature of _convert_messages to use output_schema:
Optional[dict] = None (keeping return type list[dict] unchanged) so static
analyzers no longer treat the None default as an implicit Optional; update any
related type hints in that scope if present.
- Around line 110-135: The check "if 'content' in message and
message['content']" is redundant; replace it by reading content =
message.get('content') and only call self._strip_reasoning when content is
truthy, e.g. if content: message['content'] = self._strip_reasoning(content).
This simplifies the logic around the local variable message in the AWS Bedrock
response parsing (refer to the message variable and the _strip_reasoning method)
and preserves the later use of message.get('content', '') to compute
text_content.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6532504 and e195dc9.

📒 Files selected for processing (2)
  • flo_ai/flo_ai/llm/__init__.py
  • flo_ai/flo_ai/llm/aws_bedrock_llm.py

Comment on lines +20 to +29
def __init__(
self,
model: str = 'openai.gpt-oss-20b-1:0',
temperature: float = 0.7,
**kwargs,
):
super().__init__(model=model, temperature=temperature, **kwargs)
self.boto_client = boto3.client('bedrock-runtime')
self.model = model
self.kwargs = kwargs
Copy link

@coderabbitai coderabbitai bot Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check for setup.py and pyproject.toml, and look for boto3 dependency
echo "=== Looking for setup.py ==="
find . -maxdepth 3 -name "setup.py" -type f | head -5

echo -e "\n=== Looking for pyproject.toml ==="
find . -maxdepth 3 -name "pyproject.toml" -type f | head -5

echo -e "\n=== Looking for setup.cfg ==="
find . -maxdepth 3 -name "setup.cfg" -type f | head -5

echo -e "\n=== Checking boto3 in requirements files ==="
find . -maxdepth 3 \( -name "*requirements*.txt" -o -name "setup.py" -o -name "pyproject.toml" -o -name "setup.cfg" \) -type f | xargs grep -l boto3 2>/dev/null | head -10

Repository: rootflo/wavefront

Length of output: 302


🏁 Script executed:

#!/bin/bash
# Read the aws_bedrock_llm.py file to see the full context
echo "=== Reading aws_bedrock_llm.py ==="
cat -n flo_ai/flo_ai/llm/aws_bedrock_llm.py | head -50

Repository: rootflo/wavefront

Length of output: 1890


🏁 Script executed:

#!/bin/bash
# Check the BaseLLM class to understand what super().__init__() does
rg -n "class BaseLLM" -A 30 --type=py

Repository: rootflo/wavefront

Length of output: 4198


🏁 Script executed:

#!/bin/bash
# Read the pyproject.toml to check if boto3 is in install_requires
echo "=== Checking flo_ai/pyproject.toml for boto3 ==="
cat flo_ai/pyproject.toml | grep -A 20 -B 2 "install_requires\|dependencies"

echo -e "\n=== Full pyproject.toml if needed ==="
cat flo_ai/pyproject.toml

Repository: rootflo/wavefront

Length of output: 3205


🏁 Script executed:

#!/bin/bash
# Also check setup.py if it exists
echo "=== Checking flo_ai/setup.py ==="
cat flo_ai/setup.py | grep -A 30 "install_requires"

Repository: rootflo/wavefront

Length of output: 120


🏁 Script executed:

#!/bin/bash
# Search for boto3 mentions to understand dependency status
echo "=== Looking for boto3 references in config files ==="
grep -r "boto3" flo_ai/ --include="*.toml" --include="*.py" --include="*.txt" --include="*.cfg" 2>/dev/null | grep -v "__pycache__" | grep -v ".pyc" | head -20

Repository: rootflo/wavefront

Length of output: 305


Add boto3 to production dependencies and fix suspicious default model ID.

  1. boto3 is missing from the production dependencies list in pyproject.toml. It's currently only in the [dependency-groups] dev section. Users installing this package will get an ImportError at runtime when trying to use AWSBedrock. Add "boto3>=1.36.1,<2" to the main dependencies list.

  2. The default model 'openai.gpt-oss-20b-1:0' does not match AWS Bedrock model ID patterns (e.g., anthropic.claude-3-sonnet-20240229-v1:0, amazon.titan-text-express-v1). This appears to be a placeholder or incorrect value. Provide a valid Bedrock model ID or document why this unusual format is intentional.

  3. Minor: self.model and self.kwargs on lines 28–29 are redundantly assigned; BaseLLM.__init__() already sets these (harmless but unnecessary).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 20 - 29, The __init__ in
AWS Bedrock LLM is missing a production dependency and contains a suspicious
default model plus redundant assignments: add "boto3>=1.36.1,<2" to the
package's main dependencies in pyproject.toml so boto3.client('bedrock-runtime')
in the __init__ won't raise ImportError; replace the default model string
'openai.gpt-oss-20b-1:0' in the AWSBedrock __init__ signature with a valid
Bedrock model ID (e.g., 'amazon.titan-text-express-v1') or document the
intentional placeholder in the constructor docstring; and remove the redundant
assignments to self.model and self.kwargs in AWSBedrock.__init__ since
BaseLLM.__init__ already sets them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 you need to add, the current code works because some of our libraries have boto dependency internally
3. Check 3 as well

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check BaseLLM.__init__() to see what it sets
echo "=== Checking BaseLLM.__init__() implementation ==="
rg -n "class BaseLLM" -A 40 --type=py flo_ai/flo_ai/llm/base_llm.py | head -50

Length of output: 1513


@vizsatiz, confirmed on both points:

  1. boto3 dependency: You're right that it should be explicitly added to the production dependencies list in pyproject.toml, even though it currently works through transitive dependencies. Add:

    "boto3>=1.36.1,<2"
  2. Redundant assignments on lines 28-29: Confirmed redundant. BaseLLM.__init__() already sets both self.model and self.kwargs (lines 17 and 20 in base_llm.py). These lines can be safely removed:

def __init__(
    self,
    model: str = 'openai.gpt-oss-20b-1:0',
    temperature: float = 0.7,
    **kwargs,
):
    super().__init__(model=model, temperature=temperature, **kwargs)
    self.boto_client = boto3.client('bedrock-runtime')
    # Remove these two lines:
    # self.model = model
    # self.kwargs = kwargs

✏️ Learnings added
Learnt from: vizsatiz
Repo: rootflo/wavefront PR: 228
File: flo_ai/flo_ai/llm/aws_bedrock_llm.py:20-29
Timestamp: 2026-02-26T09:29:30.130Z
Learning: In the `flo_ai` package, boto3 is currently available through transitive dependencies from other internal libraries, but should be added explicitly to the production dependencies in pyproject.toml for the AWS Bedrock integration.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +63 to +88
@trace_llm_call(provider='bedrock')
async def generate(
self,
messages: list[dict],
functions: Optional[List[Dict[str, Any]]] = None,
output_schema: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Any:
converted = self._convert_messages(messages, output_schema)

request_body: Dict[str, Any] = {
'model': self.model,
'messages': converted,
'temperature': self.temperature,
}
if 'max_tokens' in self.kwargs:
request_body['max_completion_tokens'] = self.kwargs['max_tokens']
if functions:
request_body['tools'] = functions

response = await asyncio.to_thread(
self.boto_client.invoke_model,
modelId=self.model,
body=json.dumps(request_body),
)
response_body = json.loads(response['body'].read().decode('utf-8'))
Copy link

@coderabbitai coderabbitai bot Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

AWS Bedrock invoke_model OpenAI compatible profiles 2025

💡 Result:

In Amazon Bedrock, “profiles” in the context of InvokeModel typically means Inference Profiles (cross‑region or application profiles), and they’re usable from both native Bedrock APIs and (in many cases) OpenAI‑compatible APIs.

  • InvokeModel supports inference profiles directly: the modelId path parameter can be a base model ID/ARN or an inference profile ID/ARN. The API reference explicitly calls this out. [1]
  • OpenAI‑compatible endpoints (2025+):
    • bedrock-mantle.{region}.amazonaws.com: OpenAI‑compatible Responses API + Chat Completions API. [2]
    • bedrock-runtime.{region}.amazonaws.com: native InvokeModel/Converse, and also supports OpenAI‑compatible Chat Completions. [2]
  • OpenAI “GPT‑OSS” models in Bedrock (released Aug 5, 2025) have Bedrock model IDs like:
    • openai.gpt-oss-20b-1:0
    • openai.gpt-oss-120b-1:0 [3]
      They support InvokeModel and OpenAI‑compatible Chat Completions (and are also listed as supporting Responses in Bedrock’s API compatibility matrix). [3][4]
  • Responses API availability (announced Dec 4, 2025): Bedrock added OpenAI Responses API support on new OpenAI‑compatible service endpoints (i.e., bedrock-mantle). [5]
  • Example inference profile IDs you might pass as modelId to InvokeModel include global/us geo routing profiles (AWS provides lists and examples, e.g., global.amazon.nova-2-lite-v1:0). [6]

Sources:
[1] InvokeModel API (modelId can be an inference profile) https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html
[2] Bedrock endpoints (bedrock-runtime vs bedrock-mantle) https://docs.aws.amazon.com/bedrock/latest/userguide/endpoints.html
[3] OpenAI models in Bedrock (IDs + release date) https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-openai.html
[4] Model/API compatibility matrix (Invoke/Chat Completions/Responses per model) https://docs.aws.amazon.com/bedrock/latest/userguide/models-api-compatibility.html
[5] AWS “What’s New” (Dec 4, 2025 Responses API on Bedrock) https://aws.amazon.com/about-aws/whats-new/2025/12/amazon-bedrock-responses-api-from-openai/
[6] Inference profiles support + example IDs https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-support.html


🌐 Web query:

AWS Bedrock Converse API vs invoke_model difference 2025

💡 Result:

In Amazon Bedrock Runtime, Converse is the newer “standard chat/messages” interface; InvokeModel is the original “raw model payload” interface. (Still true in 2025.)

Key differences

  • Abstraction / portability

    • Converse: consistent messages-based request/response across supported chat-capable models; you can generally switch models with minimal code changes. [1][2]
    • InvokeModel: you send the provider/model-specific JSON body (field names and shapes differ by model family), so switching models often means rewriting payload parsing/building. [3][4]
  • Conversation handling

    • Converse: designed for multi-turn; you pass structured messages (including history) in one standardized format. [1][2]
    • InvokeModel: can do multi-turn only if the specific model’s schema supports it (for example, Anthropic “messages” format), but it’s not standardized across providers. [3]
  • Tool use (function calling)

    • Converse: has first-class, standardized toolConfig/tool use for supported models. [1][2]
    • InvokeModel: tool/function calling is provider-specific (different field names/structures depending on model family). [4]
  • Modalities & “non-chat” workloads

    • InvokeModel: explicitly supports text, images, and embeddings (whatever the selected model exposes), using the model’s native schema. [3]
    • Converse: only works with models that support messages (chat-style). [2]
  • Structured outputs (JSON schema)

    • Both can do structured outputs, but the parameter locations/names differ; Converse uses Bedrock-standard fields, while InvokeModel varies by provider (and can vary again for open-weight models). [4]
  • Endpoints / operations

    • Converse: POST /model/{modelId}/converse (plus ConverseStream). [2]
    • InvokeModel: POST /model/{modelId}/invoke (plus streaming variant). [3]

Practical guidance

  • Prefer Converse/ConverseStream for chat apps, multi-turn assistants, tool/function calling, and easier model switching. [1][2]
  • Use InvokeModel/InvokeModelWithResponseStream when you need full provider-native control, or you’re calling non-messages models (for example embeddings or many image models). [3][4]

Sources: AWS “What’s New” (Converse API announcement) [1], Bedrock Runtime API refs for Converse [2] and InvokeModel [3], and AWS guidance comparing parameter formats (structured outputs / Converse vs InvokeModel) [4].


🌐 Web query:

AWS Bedrock invoke_model request body schema format

💡 Result:

InvokeModel does not have one universal JSON schema. The body is model-specific JSON (sent as raw bytes) and must match the inference parameters for the particular model/provider you’re invoking. You must set Content-Type: application/json (and typically Accept: application/json). (docs.aws.amazon.com)

Generic REST shape

  • Endpoint: POST /model/{modelId}/invoke
  • Headers: Content-Type: application/json, Accept: application/json
  • Body: JSON for that model (max request body size is very large, up to 25,000,000 bytes). (docs.aws.amazon.com)

Example request bodies (common ones)

Anthropic Claude (Messages API format)

{
  "anthropic_version": "bedrock-2023-05-31",
  "max_tokens": 1024,
  "messages": [
    { "role": "user", "content": [ { "type": "text", "text": "Hello world" } ] }
  ]
}

(docs.aws.amazon.com)

Amazon Titan Text (text generation)

{
  "inputText": "Write a short poem about apples.",
  "textGenerationConfig": {
    "maxTokenCount": 256,
    "temperature": 0,
    "topP": 0.9,
    "stopSequences": []
  }
}

(aws.amazon.com)

Amazon Titan Text Embeddings

{
  "inputText": "Text to embed"
}

(aws.amazon.com)

If you tell me the exact modelId (e.g., anthropic.claude-..., amazon.titan-..., stability.stable-diffusion-...), I can paste the exact request/response schema for that model from the “Inference request parameters…” reference. (docs.aws.amazon.com)

Citations:


🏁 Script executed:

cd flo_ai && head -60 flo_ai/llm/aws_bedrock_llm.py | cat -n

Repository: rootflo/wavefront

Length of output: 2260


🏁 Script executed:

cd flo_ai && rg -A 5 -B 5 "class.*Bedrock" flo_ai/llm/aws_bedrock_llm.py

Repository: rootflo/wavefront

Length of output: 294


🏁 Script executed:

cd flo_ai && rg "invoke_model\|InvokeModel\|Converse\|modelId" flo_ai/llm/aws_bedrock_llm.py

Repository: rootflo/wavefront

Length of output: 43


🏁 Script executed:

cd flo_ai && sed -n '63,90p' flo_ai/llm/aws_bedrock_llm.py | cat -n

Repository: rootflo/wavefront

Length of output: 1226


🏁 Script executed:

cd flo_ai && rg -i "openai\|converse\|model.*compatibility\|supported.*model" flo_ai/llm/aws_bedrock_llm.py

Repository: rootflo/wavefront

Length of output: 43


🏁 Script executed:

cd flo_ai && grep -n "invoke_model\|ConversUrl" flo_ai/llm/aws_bedrock_llm.py

Repository: rootflo/wavefront

Length of output: 174


🏁 Script executed:

cd flo_ai && head -100 flo_ai/llm/aws_bedrock_llm.py | tail -40

Repository: rootflo/wavefront

Length of output: 1440


🏁 Script executed:

cd flo_ai && grep -n "class\|def\|#" flo_ai/llm/aws_bedrock_llm.py | head -30

Repository: rootflo/wavefront

Length of output: 861


This implementation is coupled to OpenAI-compatible Bedrock models, not general Bedrock support.

The request body uses OpenAI-style schema (model, messages, temperature, max_completion_tokens, tools) and response parsing expects choices[].message format. While the default model openai.gpt-oss-20b-1:0 is OpenAI-compatible and works correctly, the generic class name AWSBedrock implies broader Bedrock model support.

Users attempting to use native Bedrock models (e.g., anthropic.claude-3-5-sonnet-*, amazon.titan-text-*, meta.llama-*) will encounter failures, as those models use model-specific request/response schemas via invoke_model or require the standardized Converse API.

Recommended fixes:

  • Add a docstring explicitly documenting that this class only supports OpenAI-compatible Bedrock models, or
  • Rename to AWSBedrockOpenAICompat to clarify scope, or
  • Migrate to the Converse API for broader Bedrock model compatibility.

Also, the 'model' key in the request body (line 74) is required for OpenAI but redundant with the modelId parameter (line 85) and may cause issues if model-specific validation is added.

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 69-69: Unused method argument: kwargs

(ARG002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 63 - 88, The AWSBedrock
class is implemented to talk to OpenAI-compatible Bedrock models (generate uses
OpenAI-style request/response shapes and boto_client.invoke_model), so either
(A) make this explicit by updating the class docstring of AWSBedrock (or rename
the class to AWSBedrockOpenAICompat) and keep the current OpenAI-compatible
behavior, or (B) migrate generate to use Bedrock's broader Converse API and
model-specific request/response handling (replace current request_body keys like
'model', 'messages', 'temperature', 'max_completion_tokens', 'tools' and adjust
response parsing that expects choices[].message). In either path remove the
redundant 'model' key from request_body (you already pass modelId=self.model to
boto_client.invoke_model) and update generate (and any usage of
_convert_messages and response parsing) so behavior matches the chosen
compatibility scope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks likes a valid issue, maybe u can add a comment on whether its true

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Examine the full generate method and response parsing
cd flo_ai && sed -n '63,110p' flo_ai/llm/aws_bedrock_llm.py | cat -n

Length of output: 2124


🏁 Script executed:

#!/bin/bash
# Look at how the response is parsed after invoke_model
cd flo_ai && sed -n '85,130p' flo_ai/llm/aws_bedrock_llm.py | cat -n

Length of output: 2027


🏁 Script executed:

#!/bin/bash
# Check if there are any comments or docstrings explaining model compatibility
cd flo_ai && head -30 flo_ai/llm/aws_bedrock_llm.py | cat -n

Length of output: 1008


@vizsatiz Yes, the issue is valid. Here's the confirmation based on the code:

Evidence from the implementation:

  1. Request body uses OpenAI format (lines 74-81):

    request_body = {
        'model': self.model,
        'messages': converted,
        'temperature': self.temperature,
        'max_completion_tokens': ...,  # OpenAI-style
        'tools': functions
    }
  2. Response parsing expects OpenAI format (lines 88-105):

    choices = response_body.get('choices', [])
    message = choices[0].get('message', {})
    tool_calls = message.get('tool_calls', [])
    tc['function']['name']  # OpenAI tool call structure
  3. No documentation about model compatibility limits.

What breaks with native Bedrock models:

  • Anthropic Claude (anthropic.claude-3-5-sonnet-*): Expects {"anthropic_version": "...", "max_tokens": ..., "messages": [...]} and returns {"content": [...], "usage": {...}} directly
  • Amazon Titan (amazon.titan-text-*): Expects {"inputText": "...", "textGenerationConfig": {...}} and returns {"results": [...]}
  • Meta Llama (meta.llama-*): Has its own schema structure

Recommendation: Either add a clear docstring/comment stating "This class only supports OpenAI-compatible Bedrock models (openai.*)" or rename to AWSBedrockOpenAI to make the limitation explicit.

Comment on lines +163 to +195
buffer = ''

for event in response['body']:
chunk_bytes = event.get('chunk', {}).get('bytes', b'')
if not chunk_bytes:
continue
text = chunk_bytes.decode('utf-8').strip()
# Try direct JSON first (some Bedrock models skip SSE envelope)
try:
data = json.loads(text)
content = data.get('choices', [{}])[0].get('delta', {}).get('content')
if content:
buffer += content
continue
except json.JSONDecodeError:
pass
# Fall back to SSE format: "data: {...}"
for line in text.split('\n'):
line = line.strip()
if line.startswith('data: ') and line != 'data: [DONE]':
try:
data = json.loads(line[6:])
content = (
data.get('choices', [{}])[0].get('delta', {}).get('content')
)
if content:
buffer += content
except json.JSONDecodeError:
pass

clean = self._strip_reasoning(buffer)
if clean:
yield {'content': clean}
Copy link

@coderabbitai coderabbitai bot Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

stream() accumulates all content and yields once — not actually streaming.

The method collects the entire response into buffer (lines 163–191), then yields a single chunk at the end (lines 193–195). This defeats the purpose of streaming, as callers will block until the full response is generated before receiving anything. Each content delta should be yielded as it arrives:

Additionally, the for event in response['body'] loop (line 165) runs synchronously on the async event loop thread, blocking it for the entire duration of the stream. Only the initial invoke_model_with_response_stream call is offloaded via asyncio.to_thread.

🐛 Proposed fix — yield incremental chunks and offload blocking iteration
-        buffer = ''
-
-        for event in response['body']:
-            chunk_bytes = event.get('chunk', {}).get('bytes', b'')
-            if not chunk_bytes:
-                continue
-            text = chunk_bytes.decode('utf-8').strip()
-            # Try direct JSON first (some Bedrock models skip SSE envelope)
-            try:
-                data = json.loads(text)
-                content = data.get('choices', [{}])[0].get('delta', {}).get('content')
-                if content:
-                    buffer += content
-                continue
-            except json.JSONDecodeError:
-                pass
-            # Fall back to SSE format: "data: {...}"
-            for line in text.split('\n'):
-                line = line.strip()
-                if line.startswith('data: ') and line != 'data: [DONE]':
-                    try:
-                        data = json.loads(line[6:])
-                        content = (
-                            data.get('choices', [{}])[0].get('delta', {}).get('content')
-                        )
-                        if content:
-                            buffer += content
-                    except json.JSONDecodeError:
-                        pass
-
-        clean = self._strip_reasoning(buffer)
-        if clean:
-            yield {'content': clean}
+        queue: asyncio.Queue = asyncio.Queue()
+
+        def _iter_events():
+            for event in response['body']:
+                chunk_bytes = event.get('chunk', {}).get('bytes', b'')
+                if chunk_bytes:
+                    queue.put_nowait(chunk_bytes)
+            queue.put_nowait(None)  # sentinel
+
+        loop = asyncio.get_event_loop()
+        loop.run_in_executor(None, _iter_events)
+
+        while True:
+            chunk_bytes = await queue.get()
+            if chunk_bytes is None:
+                break
+            text = chunk_bytes.decode('utf-8').strip()
+            content = None
+            try:
+                data = json.loads(text)
+                content = data.get('choices', [{}])[0].get('delta', {}).get('content')
+            except json.JSONDecodeError:
+                for line in text.split('\n'):
+                    line = line.strip()
+                    if line.startswith('data: ') and line != 'data: [DONE]':
+                        try:
+                            data = json.loads(line[6:])
+                            content = data.get('choices', [{}])[0].get('delta', {}).get('content')
+                        except json.JSONDecodeError:
+                            logger.debug("Skipping malformed SSE line: %s", line)
+            if content:
+                clean = self._strip_reasoning(content)
+                if clean:
+                    yield {'content': clean}

Note: This sketch uses a queue + executor to avoid blocking the event loop. The exact approach may need refinement depending on the framework's async patterns, but the key point is: yield each chunk as it arrives.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 163 - 195, The stream()
method currently accumulates all deltas into a local buffer and yields only
once; change it to yield each content delta as it is parsed so consumers receive
streaming updates: offload the blocking iteration over response['body'] (which
comes from invoke_model_with_response_stream) to a thread or executor (e.g.,
asyncio.to_thread/run_in_executor) so the event loop isn't blocked, parse each
event/chunk as it arrives, apply self._strip_reasoning to each incremental piece
(or accumulate minimally and strip per-yield if needed) and yield {'content':
cleaned_chunk} immediately for every non-empty content delta instead of
returning a single final yield.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this too

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vizsatiz, understood! Would you like me to plan the code changes to implement proper streaming for the stream() method? I can help refactor it to:

  1. Yield content deltas incrementally as they arrive (instead of accumulating everything in a buffer)
  2. Offload the blocking iteration over response['body'] to avoid blocking the async event loop
  3. Apply _strip_reasoning() to each chunk appropriately

Just let me know and I'll use `` to break down the implementation for you! 🚀


💡 For code changes across multiple files, use @coderabbitai plan in an issue comment

Comment on lines +20 to +29
def __init__(
self,
model: str = 'openai.gpt-oss-20b-1:0',
temperature: float = 0.7,
**kwargs,
):
super().__init__(model=model, temperature=temperature, **kwargs)
self.boto_client = boto3.client('bedrock-runtime')
self.model = model
self.kwargs = kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 you need to add, the current code works because some of our libraries have boto dependency internally
3. Check 3 as well

Comment on lines +63 to +88
@trace_llm_call(provider='bedrock')
async def generate(
self,
messages: list[dict],
functions: Optional[List[Dict[str, Any]]] = None,
output_schema: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Any:
converted = self._convert_messages(messages, output_schema)

request_body: Dict[str, Any] = {
'model': self.model,
'messages': converted,
'temperature': self.temperature,
}
if 'max_tokens' in self.kwargs:
request_body['max_completion_tokens'] = self.kwargs['max_tokens']
if functions:
request_body['tools'] = functions

response = await asyncio.to_thread(
self.boto_client.invoke_model,
modelId=self.model,
body=json.dumps(request_body),
)
response_body = json.loads(response['body'].read().decode('utf-8'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks likes a valid issue, maybe u can add a comment on whether its true

if content:
buffer += content
continue
except json.JSONDecodeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address this, we shouldnt do pass without a log

)
if content:
buffer += content
except json.JSONDecodeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Comment on lines +163 to +195
buffer = ''

for event in response['body']:
chunk_bytes = event.get('chunk', {}).get('bytes', b'')
if not chunk_bytes:
continue
text = chunk_bytes.decode('utf-8').strip()
# Try direct JSON first (some Bedrock models skip SSE envelope)
try:
data = json.loads(text)
content = data.get('choices', [{}])[0].get('delta', {}).get('content')
if content:
buffer += content
continue
except json.JSONDecodeError:
pass
# Fall back to SSE format: "data: {...}"
for line in text.split('\n'):
line = line.strip()
if line.startswith('data: ') and line != 'data: [DONE]':
try:
data = json.loads(line[6:])
content = (
data.get('choices', [{}])[0].get('delta', {}).get('content')
)
if content:
buffer += content
except json.JSONDecodeError:
pass

clean = self._strip_reasoning(buffer)
if clean:
yield {'content': clean}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants